KinesisデータストリームをLambdaで処理する時のエラー制御方法をまとめてみた
Kinesis Data Streams/DynamoDB Streamsのようなストリームを安定して処理するには、適切なエラー処理が重要です。
本記事では
- 不正な入力データによるリトライ
- バッチ処理時に一部のレコードが原因でバッチ全体をリトライ
- 正しく処理できなかったレコードを通知
といった異常系処理を制御する方法を紹介します。
前提として、ワーカーはLambdaで実装し、Kinesisとは EventBridge Pipes、または、Lambdaイベントソースマッピングで連携しているものとします。
データソースがDynamoDB Streamsの場合も、同様のことが成り立つと思われます。
不正な入力データによるリトライを制御
一時的な障害に対してはリトライが有効ですが、不具合・不正データによるエラーが発生している場合、リトライしても処理は成功しません。 また、一時的な障害が長引いた場合は、適切なタイミングで諦めることが必要です。
例えば、EventBridge Pipesの場合、デフォルトでは24時間、または、リトライ185回のどちらかの条件を満たすまで再試行します。速やかに再試行を諦めるには、以下の値をカスタマイズします。
- リトライ数(Retry attempts)
- イベントの経過時間(Maximum age of event/record)
EventBridge Pipesの場合、Pipe Settingsで設定します。
Lambda Event Sourceの場合、Trigger configurationのAdditional settingsで設定します。
試しに意図的にエラーを発生させます。
デフォルト設定では、延々とリトライを繰り返します。また、exponential backoffにより、リトライ間隔が伸びていることもわかります。
リトライ | 実行時刻 | 間隔(秒) |
---|---|---|
0 | 7.123 | |
1 | 7.374 | 0.251 |
2 | 7.893 | 0.519 |
3 | 8.890 | 0.997 |
4 | 10.922 | 2.032 |
5 | 14.573 | 3.651 |
6 | 19.773 | 5.200 |
7 | 34.525 | 14.752 |
次に、リトライ数を2にして再度エラーを発生させます。
初回呼び出しでエラーが起きた後、2回再試行し、合計3回実行して処理を終えました。
バッチ処理時に一部のレコードが原因でバッチ全体をリトライされるのを防ぐ
バッチ処理時に一部のレコードが原因でバッチ全体が再実行されるのを防ぎたい場合
- 二分探索法(bisect)でバッチを分割し、エラーを誘発するレコードを含むバッチを絞り込む
- バッチ内で失敗したレコード(のシーケンス)を特定し、失敗箇所からやり直す
の2種類のアプローチがあります。
前者はアプリケーション側の作り込みなしに、Kinesisの基盤側でエラーを絞り込んでくれますが、同じレコードに対して何度も再実行される可能性があり、バッチサイズが大きい場合は無駄が多くなります。
後者はアプリケーション側で軽微な作り込みが伴うため、複雑さが増す一方で、リトライ時にはエラー箇所から再実行され、先行する処理に成功したレコードは再実行されないため、無駄が減ります。
実装のシンプルさを優先するなら前者、処理の効率を重視するなら後者です。
この動作を確認します。
最大リトライ数を1に設定し、不正なレコードを3つ目に含む4つのデータを渡します。
$ cat test.json [ { "Data": "0", "PartitionKey": "0" }, { "Data": "1", "PartitionKey": "1" }, { "Data": "c", "PartitionKey": "c" }, { "Data": "3", "PartitionKey": "3" } ] $ aws kinesis put-records \ --stream-name foo \ --cli-binary-format \ raw-in-base64-out \ --records file://test.json { "FailedRecordCount": 0, "Records": [ { "SequenceNumber": "49640346564938479822052999557991794161517120249185435650", "ShardId": "shardId-000000000000" }, { "SequenceNumber": "49640346564938479822052999557993003087336734878360141826", "ShardId": "shardId-000000000000" }, { "SequenceNumber": "49640346564938479822052999557994212013156349507534848002", "ShardId": "shardId-000000000000" }, { "SequenceNumber": "49640346564938479822052999557995420938975964136709554178", "ShardId": "shardId-000000000000" } ] }
1. 二分探索法(bisect)でバッチを分割し、エラーを誘発するレコードを含むバッチを絞り込む
bisectオプションを有効にすると、エラー発生時にバッチを2分割して再実行します。
アプリケーションの作り込みなしにKinesis の基盤側でエラーを絞り込んでくれる一方で、同じレコードに対して何度も再実行される可能性があり、バッチサイズが大きい場合は無駄が大きくなるリスクがあります。
bisectは一度正常に処理したメッセージも再処理するため、ワーカーは冪等に実装する必要があります。
EventBridge Pipesの場合、Source設定のAdditional settingsのOn partial batch item failureをAUTOMATIC_BISECT
に設定します。
Lambda Event Sourceの場合、Trigger configurationのAdditional settingsのSplit batch on errorをチェックします。
Pipes向けに以下のようなConsumerを用意し、動作確認します。
import base64 import json def lambda_handler3(event, context): print([base64.b64decode(record['data']) for record in event]) for record in event: data = base64.b64decode(record['data']) int(data)
エラーが起きる度に、バッチを2分割して再実行していることがわかります。
2. バッチ内で失敗したレコード(のシーケンス)を特定し、失敗箇所からやり直す
Lambdaのレスポンスに失敗したレコード情報を batchItemFailures
をキーにして返すと、失敗位置から再実行されます。
EventBridge Pipesの場合、デフォルトで有効です。
Lambda Event Sourceの場合、Trigger configurationのAdditional settingsのReport batch item failuresをチェックします。
Pipes向け実装例
import base64 import json def lambda_handler(event, context): print([base64.b64decode(record['data']) for record in event]) for record in event: try: data = base64.b64decode(record['data']) int(data) except: result = {"batchItemFailures" : [{'itemIdentifier' : record['sequenceNumber']}]} print(result) return result return {"batchItemFailures":[]}
Event Source向け実装例
import base64 import json def lambda_handler(event, context): print([base64.b64decode(record['kinesis']['data']) for record in event['Records']]) for record in event['Records']: data = base64.b64decode(record['kinesis']['data']) try: int(data) except: result = {"batchItemFailures" : [{'itemIdentifier' : record['kinesis']['sequenceNumber']}]} print(result) return result
Kinesis は時系列(シーケンス番号)順に処理するため、処理の失敗を検知したらバッチ全体を処理せずに、すぐにレスポンスを返してしまって問題有りません。 仮にレスポンスに複数のシーケンス番号が含まれていても、リトライ時には、一番小さなシーケンス番号から再試行されます。
If the batchItemFailures array contains multiple items, Lambda uses the record with the lowest sequence number as the checkpoint. Lambda then retries all records starting from that checkpoint.
このオプションを有効にすると、エラーの起きたレコードから再実行されますが、リトライの上限に達すると、同じバッチの他のレコードも道連れにして処理を終了します。
※4レコード目の「3」が処理されていません
bisect オプションも有効にすることで、エラー箇所から再実行し、更に、異常レコードだけピンポイントに処理をスキップできます。
CloudWatch メトリクス
EventBridge Pipesの場合、エラー発生時にメトリクスが生成され、ワーカーのエラーハンドリングによって生成されるメトリクスが異なります。
- TargetStageFailed (
batchItemFailures
を利用しなかった場合) - TargetStagePartiallyFailed (
batchItemFailures
を利用した場合)
ご注意ください。
正しく処理できなかったレコードを通知
デッドレターキュー(DLQ)を利用すると、処理に失敗したメッセージを通知できます。
EventBridge Pipesの場合、Pipe settingsのDead-letter queueで設定します。
Lambda Event Sourceの場合、On-failure destinationで設定します。
Pipes DLQのメッセージ例
{ "context": { "partnerResourceArn": "arn:aws:pipes:eu-central-1:12345:pipe/kine_pipe", "condition": "RetryAttemptsExhausted" }, "version": "1.0", "timestamp": "2023-05-01T10:52:00.579Z", "KinesisBatchInfo": { "shardId": "shardId-000000000000", "startSequenceNumber": "49640346564938479822052999558037733342662992515972464642", "endSequenceNumber": "49640346564938479822052999558037733342662992515972464642", "approximateArrivalOfFirstRecord": "2023-05-01T10:51:59.005Z", "approximateArrivalOfLastRecord": "2023-05-01T10:51:59.005Z", "batchSize": 1, "streamArn": "arn:aws:kinesis:eu-central-1:12345:stream/foo" } }
このメッセージには
- 通知された原因(
condition
) - 対象のストリーム(
streamArn
) - 対象のシャード(
shardId
) - 対象のシーケンス番号(
startSequenceNumber
)
といった情報が含まれています
DLQメッセージから、エラーを引き起こしたレコードを取得する方法については、次の記事を参照ください。
まとめ
Amazon Kinesis Data StreamsとLambdaの組み合わせを例に、ストリーム処理の異常処理方法をまとめました。 DynamoDB StreamsやLambda以外のワーカーを利用する場合も、考え方は同じです。
- リトライ数
- エラー範囲を絞り込むbisect
- エラーレコードを通知するDLQ
は無条件で設定し、部分成功(batchItemFailures
)は余力があれば対応しましょう。
更に、ワーカーの処理は冪等に実装しましょう。
運用周りは後回しにしがちですが、早い段階でエンドツーエンドでデータを連携させ、エラー処理も仕込んでおくと、開発効率も向上すると思います。
それでは。